package com.dmp.total

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Txt2Parquet {
    def main(args: Array[String]): Unit = {
      // 校验参数
      if (args.length != 2) {
        println(
          """
            |Usage:
            | cn.sheep.violet.etl.Bz2Parquet
            | args:
            |     dataInputPath:  原始日志输入路径
            |     dataOutputPath: parquet存储路径
          """.stripMargin)
        sys.exit(101) // 101: 参数不合法 $?
      }

      // 模式匹配 case .... esac
      val Array(dataPath, outPath) = args

      val sparkConf = new SparkConf()
      sparkConf.setAppName("日志转parquet")
      sparkConf.setMaster("local[*]")
      // 设置spark程序采用的序列化方式
      sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

      val sc = new SparkContext(sparkConf)
      val rawData: RDD[String] = sc.textFile(dataPath)
      //导入隐式转换
      import com.oracle.beans.WenString._
      val rowData: RDD[Row] = rawData
        .map(_.split(",", -1)).filter(_.length >= 85)
        .map(fields => {
          Row(
            fields(0)
            ,
            fields(1).toIntPlus
            ,
            fields(2).toIntPlus
            ,
            fields(3).toIntPlus
            ,
            fields(4).toIntPlus
            ,
            fields(5)
            ,
            fields(6)
            ,
            fields(7).toIntPlus
            ,
            fields(8).toIntPlus
            ,
            fields(9).toDoublePlus
            ,
            fields(10).toDoublePlus
            ,
            fields(11)
            ,
            fields(12)
            ,
            fields(13)
            ,
            fields(14)
            ,
            fields(15)
            ,
            fields(16)
            ,
            fields(17).toIntPlus
            ,
            fields(18)
            ,
            fields(19)
            ,
            fields(20).toIntPlus
            ,
            fields(21).toIntPlus
            ,
            fields(22)
            ,
            fields(23)
            ,
            fields(24)
            ,
            fields(25)
            ,
            fields(26).toIntPlus
            ,
            fields(27)
            ,
            fields(28).toIntPlus
            ,
            fields(29)
            ,
            fields(30).toIntPlus
            ,
            fields(31).toIntPlus
            ,
            fields(32).toIntPlus
            ,
            fields(33)
            ,
            fields(34).toIntPlus
            ,
            fields(35).toIntPlus
            ,
            fields(36).toIntPlus
            ,
            fields(37)
            ,
            fields(38).toIntPlus
            ,
            fields(39).toIntPlus
            ,
            fields(40).toDoublePlus
            ,
            fields(41).toDoublePlus
            ,
            fields(42).toIntPlus
            ,
            fields(43)
            ,
            fields(44).toDoublePlus
            ,
            fields(45).toDoublePlus
            ,
            fields(46)
            ,
            fields(47)
            ,
            fields(48)
            ,
            fields(49)
            ,
            fields(50)
            ,
            fields(51)
            ,
            fields(52)
            ,
            fields(53)
            ,
            fields(54)
            ,
            fields(55)
            ,
            fields(56)
            ,
            fields(57).toIntPlus
            ,
            fields(58).toDoublePlus
            ,
            fields(59).toIntPlus
            ,
            fields(60).toIntPlus
            ,
            fields(61)
            ,
            fields(62)
            ,
            fields(63)
            ,
            fields(64)
            ,
            fields(65)
            ,
            fields(66)
            ,
            fields(67)
            ,
            fields(68)
            ,
            fields(69)
            ,
            fields(70)
            ,
            fields(71)
            ,
            fields(72)
            ,
            fields(73).toIntPlus
            ,
            fields(74).toDoublePlus
            ,
            fields(75).toDoublePlus
            ,
            fields(76).toDoublePlus
            ,
            fields(77).toDoublePlus
            ,
            fields(78).toDoublePlus
            ,
            fields(79)
            ,
            fields(80)
            ,
            fields(81)
            ,
            fields(82)
            ,
            fields(83)
            ,
            fields(84).toIntPlus
          )

        })
      //构建Schema
      val schema = StructType(
        Seq(
          StructField("sessionid", StringType),
          StructField("advertisersid", IntegerType),
          StructField("adorderid", IntegerType),
          StructField("adcreativeid", IntegerType),
          StructField("adplatformproviderid", IntegerType),
          StructField("sdkversion", StringType),
          StructField("adplatformkey", StringType),
          StructField("putinmodeltype", IntegerType),
          StructField("requestmode", IntegerType),
          StructField("adprice", DoubleType),
          StructField("adppprice", DoubleType),
          StructField("requestdate", StringType),
          StructField("ip", StringType),
          StructField("appid", StringType),
          StructField("appname", StringType),
          StructField("uuid", StringType),
          StructField("device", StringType),
          StructField("client", IntegerType),
          StructField("osversion", StringType),
          StructField("density", StringType),
          StructField("pw", IntegerType),
          StructField("ph", IntegerType),
          StructField("long", StringType),
          StructField("lat", StringType),
          StructField("provincename", StringType),
          StructField("cityname", StringType),
          StructField("ispid", IntegerType),
          StructField("ispname", StringType),
          StructField("networkmannerid", IntegerType),
          StructField("networkmannername", StringType),
          StructField("iseffective", IntegerType),
          StructField("isbilling", IntegerType),
          StructField("adspacetype", IntegerType),
          StructField("adspacetypename", StringType),
          StructField("devicetype", IntegerType),
          StructField("processnode", IntegerType),
          StructField("apptype", IntegerType),
          StructField("district", StringType),
          StructField("paymode", IntegerType),
          StructField("isbid", IntegerType),
          StructField("bidprice", DoubleType),
          StructField("winprice", DoubleType),
          StructField("iswin", IntegerType),
          StructField("cur", StringType),
          StructField("rate", DoubleType),
          StructField("cnywinprice", DoubleType),
          StructField("imei", StringType),
          StructField("mac", StringType),
          StructField("idfa", StringType),
          StructField("openudid", StringType),
          StructField("androidid", StringType),
          StructField("rtbprovince", StringType),
          StructField("rtbcity", StringType),
          StructField("rtbdistrict", StringType),
          StructField("rtbstreet", StringType),
          StructField("storeurl", StringType),
          StructField("realip", StringType),
          StructField("isqualityapp", IntegerType),
          StructField("bidfloor", DoubleType),
          StructField("aw", IntegerType),
          StructField("ah", IntegerType),
          StructField("imeimd5", StringType),
          StructField("macmd5", StringType),
          StructField("idfamd5", StringType),
          StructField("openudidmd5", StringType),
          StructField("androididmd5", StringType),
          StructField("imeisha1", StringType),
          StructField("macsha1", StringType),
          StructField("idfasha1", StringType),
          StructField("openudidsha1", StringType),
          StructField("androididsha1", StringType),
          StructField("uuidunknow", StringType),
          StructField("userid", StringType),
          StructField("iptype", IntegerType),
          StructField("initbidprice", DoubleType),
          StructField("adpayment", DoubleType),
          StructField("agentrate", DoubleType),
          StructField("lrate", DoubleType),
          StructField("adxrate", DoubleType),
          StructField("title", StringType),
          StructField("keywords", StringType),
          StructField("tagid", StringType),
          StructField("callbackdate", StringType),
          StructField("channelid", StringType),
          StructField("mediatype", IntegerType)
        )
      )
      //处理数据

      val sqlContext = new SQLContext(sc)
      //设置Parquet的压缩格式
      sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

      val dataFrame= sqlContext.createDataFrame(rowData,schema)

      /*
      * 判断输出路径是否存在
      * 如果存在就删除
      * fs:
          *     是本地文件系统还是分布式文件系统，取决于客户端中的fs.defaultFS的参数
          *         file:///            本地文件系统
          *         hdfs://host:9000/   分布式文件系统
      * */

      val fs: FileSystem = FileSystem.get(sc.hadoopConfiguration)

      val path=new Path(outPath)
      if(fs.exists(path)){
        fs.delete(path,true)
      }
      //开始存数据

      dataFrame.write.parquet(outPath)
      sc.stop()
    }

}
